package handler

import (
	"context"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/go-redsync/redsync/v4"
	"github.com/go-redsync/redsync/v4/redis/goredis/v8"
	"go.uber.org/zap"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/emptypb"
	. "service/inventory_srv/global"
	"service/inventory_srv/model"
	"service/inventory_srv/proto"
	"strconv"
)

// 实现商品服务（service层）的接口
// 接口定义文件——goods.proto

type InventoryServer struct {
	// 用以忽略接口内部实现逻辑，从而快速启动grpc服务测试，准备上线时必须将这行代码注释掉
	//proto.UnimplementedInventoryServer
}

/*
SetInv(context.Context, *GoodsInvInfo) (*emptypb.Empty, error)
InvDetail(context.Context, *GoodsInvInfo) (*GoodsInvInfo, error)
Sell(context.Context, *SellInfo) (*emptypb.Empty, error)
Reback(context.Context, *SellInfo) (*emptypb.Empty, error)
*/

// 设置库存
func (g *InventoryServer) SetInv(c context.Context, r *proto.GoodsInvInfo) (*emptypb.Empty, error) {
	inv := new(model.Inventory)
	inv.Goods = r.GoodsId
	// 去库存表中查找goodsId为r.GoodsId的记录
	// 如果找到该条记录的话，inv的id会被赋值，后续Save执行的实际上是更新操作；
	// 如果没有找到该条记录，inv的id不会被赋值，后续Save执行的实际上是添加操作。
	// 所以这里不用处理错误
	DB.Where(inv).First(inv)
	inv.Stocks = r.Num

	// 开启事务
	tx := DB.Begin()
	// 未找到商品的库存记录则新增，找到就修改，没有去检验商品表中的商品id是否存在，因为这个数据库与商品数据库是隔离的
	if rs := tx.Save(inv); rs.Error != nil {
		tx.Rollback()
		return &emptypb.Empty{}, rs.Error
	}
	tx.Commit()
	return &emptypb.Empty{}, nil
}

// 根据商品id获取库存信息
func (g *InventoryServer) InvDetail(c context.Context, r *proto.GoodsInvInfo) (*proto.GoodsInvInfo, error) {
	inv := new(model.Inventory)
	inv.Goods = r.GoodsId
	if res := DB.Where(inv).First(inv); res.Error != nil {
		return nil, status.Errorf(codes.InvalidArgument, "未找到该商品的库存信息")
	}
	return &proto.GoodsInvInfo{GoodsId: inv.Goods, Num: inv.Stocks}, nil
}

//// 扣减商品库存
//// 分布式锁：【mysql悲观锁】
//func (g *InventoryServer) Sell(c context.Context, r *proto.SellInfo) (*emptypb.Empty, error) {
//	tx := DB.Begin()
//	inv := new(model.Inventory)
//	// 这里可以进一步优化：改成批量查询、批量更新
//	for _, goodInfo := range r.GoodsInfo {
//		*inv = model.Inventory{}
//		//if res := DB.Where("goods=?", goodInfo.GoodsId).First(inv); res.RowsAffected == 0 {
//		if res := tx.Clauses(clause.Locking{Strength: "UPDATE"}).Where("goods=?", goodInfo.GoodsId).First(inv); res.RowsAffected == 0 {
//			tx.Rollback()
//			return nil, status.Errorf(codes.InvalidArgument, "没有库存信息")
//		}
//
//		if inv.Stocks < goodInfo.Num {
//			tx.Rollback()
//			return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
//		}
//
//		// 扣减
//		inv.Stocks -= goodInfo.Num
//		tx.Save(inv)
//	}
//	tx.Commit()
//	return &emptypb.Empty{}, nil
//}

//// 扣减商品库存
//// 分布式锁：mysql乐观锁
//func (g *InventoryServer) Sell(c context.Context, r *proto.SellInfo) (*emptypb.Empty, error) {
//	tx := DB.Begin()
//	inv := new(model.Inventory)
//	// 这里可以进一步优化：改成批量查询、批量更新
//	for _, goodInfo := range r.GoodsInfo {
//		// 乐观锁核心逻辑
//		for ; ; {
//			*inv = model.Inventory{}
//			if res := DB.Where("goods=?", goodInfo.GoodsId).First(inv); res.RowsAffected == 0 {
//				tx.Rollback()
//				return nil, status.Errorf(codes.InvalidArgument, "没有库存信息")
//			}
//
//			if inv.Stocks < goodInfo.Num {
//				tx.Rollback()
//				return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
//			}
//
//			res := tx.
//				Model(&model.Inventory{}).
//				Where("goods=? and version=?", goodInfo.GoodsId, inv.Version).
//				Select("stocks", "version").
//				Updates(model.Inventory{
//					Stocks:    inv.Stocks - goodInfo.Num,
//					Version:   inv.Version + 1,
//				})
//			if res.RowsAffected == 1 {
//				break
//			}
//		}
//	}
//	tx.Commit()
//	return &emptypb.Empty{}, nil
//}

// 扣减商品库存
// 分布式锁：redsync
func (g *InventoryServer) Sell(c context.Context, r *proto.SellInfo) (*emptypb.Empty, error) {
	// redsync分布式锁
	var mutexList []*redsync.Mutex
	if RedisClient == nil {
		zap.S().Error("连接redis出错")
		return nil, status.Errorf(codes.Internal, "连接redis出错")
	}

	tx := DB.Begin()
	inv := new(model.Inventory)

	// 库存扣减详情
	sellDetail := &model.StockSellDetail{
		OrderSn: r.OrderSn,
		// 初始化时定义订单状态为"已扣减"
		Status: 1,
	}
	var details []model.GoodsDetail

	// 这里可以进一步优化：改成批量查询、批量更新
	for _, goodInfo := range r.GoodsInfo {
		*inv = model.Inventory{}
		// 获取redsync锁
		pool := goredis.NewPool(RedisClient)
		rs := redsync.New(pool)
		mutexname := "[redsync]goods:" + strconv.FormatInt(int64(goodInfo.GoodsId), 10)
		mutex := rs.NewMutex(mutexname)
		//mutex := rs.NewMutex(mutexname,
		//	redsync.WithExpiry(8), // 设置锁的自动过期时间
		//	redsync.WithTries(32), // 设置重试次数
		//	redsync.WithRetryDelay(500*time.Microsecond), // 设置获取锁失败后直到下次重试的等待时间
		//)
		mutexList = append(mutexList, mutex)
		// 测试开始运行时，所有的g都开始争夺锁，每个g默认的等待时间是16秒，如果任务过多导致所有g的执行时间超过了16秒，
		// 那么势必会有一个g因为等待时间过长而获取不到锁报错，这是正常的情况。
		if err := mutex.Lock(); err != nil {
			tx.Rollback()
			zap.S().Errorf("[redsync]获取全局锁失败:%v\n", err.Error())
			return nil, status.Errorf(codes.Internal, "获取全局锁失败")
		}

		if res := DB.Where("goods=?", goodInfo.GoodsId).First(inv); res.RowsAffected == 0 {
			tx.Rollback()
			zap.S().Error("没有库存信息")
			return nil, status.Errorf(codes.InvalidArgument, "没有库存信息")
		}

		if inv.Stocks < goodInfo.Num {
			tx.Rollback()
			zap.S().Error("库存不足")
			return nil, status.Errorf(codes.ResourceExhausted, "库存不足")
		}

		preStocks := inv.Stocks

		// 扣减
		inv.Stocks -= goodInfo.Num
		if res := tx.Save(inv); res.Error != nil {
			tx.Rollback()
			zap.S().Errorf("执行扣减库存失败:%v", res.Error)
			return nil, status.Errorf(codes.Internal, "执行扣减库存失败:%v", res.Error)
		}

		// 记录订单中每件商品扣减之前的库存，方便后续库存归还
		details = append(details, model.GoodsDetail{
			Goods: goodInfo.GoodsId,
			Num:   preStocks,
		})
	}

	sellDetail.Detail = details
	if res := tx.Create(sellDetail); res.Error != nil || res.RowsAffected != 1 {
		tx.Rollback()
		zap.S().Errorf("记录库存扣减详情失败:%v", res.Error)
		return nil, status.Errorf(codes.Internal, "记录库存扣减详情失败:%v", res.Error)
	}

	tx.Commit()

	//释放锁必须放置在提交事务之后，这是一定的。做到了这一点，就不会引发数据不一致的问题，
	//而老师给的代码将释放锁放在了commit之前，那样一定是错误的做法。
	//千万注意：在for循环中同时开启了几个锁（实际上是同时设置了多个key），
	//这里就要释放几个锁，否则其他锁就只能等待过期后再释放，这就会浪费8秒（默认过期时间为8秒）时间，
	//做到了这一点，就不会出现获取锁等待时间过长的问题。
	//释放redsync锁。
	for _, mutex := range mutexList {
		if ok, err := mutex.Unlock(); !ok || err != nil {
			tx.Rollback()
			zap.S().Error("[redsync]释放锁失败")
			return nil, status.Errorf(codes.Internal, "释放锁失败")
		}
	}
	return &emptypb.Empty{}, nil
}

// 处理rocketMQ中的消息，自动归还库存。
// 既然是归还库存，那么就应该知道每件商品应该归还多少。
// 这个接口应该确保幂等性，不能因为消息的重复发送导致一个订单的库存归还多次，
// 另外，没有扣减的库存也不可以执行归还。
// 如何确保这些都没有问题？解决办法是新建一张表，这张表需要详细记录订单的库存扣减以及归还细节。
func AutoReback(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
	for _, msg := range msgs {
		// 获取rmq中的消息——订单编号
		orderSn := string(msg.Body)
		zap.S().Debug("消费端读到消息：", orderSn)
		// 查询订单对应的库存扣减详情表
		stockSellDetail := new(model.StockSellDetail)

		res := DB.Where(model.StockSellDetail{OrderSn: orderSn}).First(stockSellDetail)
		if res.RowsAffected == 1 && stockSellDetail.Status == 1 {
			// 若在库存扣减详情表中可以找到该订单，并且扣减状态是"已扣减"，则应归还库存

			// 归还库存
			// insert into inventory (goods,stocks) values (55,666),(56,666),(57,666) on duplicate key update stocks=values(stocks);
			buffer := make([]byte, 0, 512)
			buffer = append(buffer, "insert into inventory (goods,stocks) values "...)
			for _, detail := range stockSellDetail.Detail {
				buffer = append(buffer, '(')
				buffer = append(buffer, strconv.FormatInt(int64(detail.Goods), 10)...)
				buffer = append(buffer, ',')
				buffer = append(buffer, strconv.FormatInt(int64(detail.Num), 10)...)
				buffer = append(buffer, "),"...)
			}
			// 去掉末尾的逗号
			buffer = buffer[:len(buffer)-1]
			buffer = append(buffer, " on duplicate key update stocks=values(stocks);"...)
			tx := DB.Begin()

			sqlStr := string(buffer)
			zap.S().Debug("库存归还sql：", sqlStr)

			// 执行sql语句，归还库存
			if updateRes := tx.Exec(sqlStr); updateRes.Error != nil {
				tx.Rollback()
				zap.S().Error("归还库存的sql执行出错", updateRes.Error)
				// 这里必须归还库存，所以执行出错应该稍后重试消费
				return consumer.ConsumeRetryLater, updateRes.Error
			}

			// 更新库存扣减详情表中对应记录的状态为"已归还"，防止重复归还
			stockSellDetail.Status = 2

			result := tx.Where(model.StockSellDetail{OrderSn: orderSn}).Select("status").Updates(stockSellDetail)
			if result.Error != nil {
				tx.Rollback()
				return consumer.ConsumeRetryLater, status.Errorf(codes.Internal, result.Error.Error())
			}
			tx.Commit()

			// 将消息消费，rmq中不再有该消息
			return consumer.ConsumeSuccess, nil
		} else {
			zap.S().Debug("不满足归还条件，直接将消息丢弃：", orderSn)
			// 不满足归还条件，直接消费该消息，不执行库存归还
			return consumer.ConsumeSuccess, nil
		}
	}
	return consumer.ConsumeSuccess, nil
}
